package com.example.container;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.nio.charset.StandardCharsets;
import java.util.*;

public class ServiceCentre {

    private String zkUrl;
    private CuratorFramework client;
    private Map<String, Object> connectionCache = new HashMap<>();

    public ServiceCentre(String zkUrl) {
        this.zkUrl = zkUrl;
        init();
    }

    /**
     * 初始化 zk 连接
     */
    public void init() {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkUrl)
                .retryPolicy(retryPolicy)
                .namespace("services")
                .build();
        client.start();
    }

    /**
     * 订阅服务
     * @param subscribe 订阅回调接口
     * @throws Exception
     */
    public void subscribe(Subscribe subscribe, Unsubscribe unsubscribe) throws Exception {
        List<String> services = client.getChildren().forPath("/");
        // 订阅服务,并注册退订监听
        for (String server : services) {
            byte[] bytes = client.getData().forPath("/" + server);
            String data = new String(bytes, StandardCharsets.UTF_8);
            // ip:port-repTime
            String[] split = data.split("-")[0].split(":");
            String host = split[0];
            int port = Integer.parseInt(split[1]);
            Object connection = subscribe.addSubscribe(host, port);
            connectionCache.put(server, connection);
            System.out.println("subscribe to: " + host + ":" + port);
        }
        // 服务上下线，注册监听
        // 参数三：cacheDate true，在收到事件通知时，可以获取到内容
        PathChildrenCache cache = new PathChildrenCache(client, "/", true);
        PathChildrenCacheListener childrenCacheListener = (client, event) -> {
            System.out.println("children lister " + event.getType());
            String server = event.getData().getPath().replaceFirst("/", "");
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("server name " + server);
                    String data = new String(event.getData().getData(), StandardCharsets.UTF_8);
                    String[] split = data.split(":");
                    String host = split[0];
                    int port = Integer.parseInt(split[1]);
                    Object connection = subscribe.addSubscribe(host, port);
                    connectionCache.put(server,connection);
                    System.out.println("subscribe to " + host + ":" + port);
                    break;
                case CHILD_REMOVED:
                    // 去掉最前面的斜杠 /
                    System.out.println("unsubscribe server: " + server);
                    Object unSubConnection = connectionCache.remove(server);
                    unsubscribe.onUnsubscribe(unSubConnection);
                    break;
                default:
                    break;
            }
        };
        cache.getListenable().addListener(childrenCacheListener);
        cache.start();

    }

    /**
     * 获取服务连接，根据上次响应时间判断，优先采用响应时间短的那个连接
     * @return
     */
    public Object getConnection() {
        List<String> serviceNames = new ArrayList<>(connectionCache.keySet());
        // 打乱 List 顺序，可以让响应时间相同的服务相对位置发生改变
        Collections.shuffle(serviceNames);
        if (serviceNames.size() == 0) {
            throw new RuntimeException("Can not get connected service");
        }
        SortedMap<Long, String> sortedMap = new TreeMap<>();
        // 获取服务上次响应时间，进行比较。
        for (String serverName : serviceNames) {
            String path = "/" + serverName;
            try {
                Long repTime = 0L;
                String data = new String(client.getData().forPath(path), StandardCharsets.UTF_8);
                String[] split = data.split("-");
                if (split.length > 1) {
                    repTime = Long.parseLong(split[1]);
                }
                // 如果距离上次响应时间超过 5s, 则将响应时间重置为 0
                if (System.currentTimeMillis() - repTime > 5000) {
                    repTime = 0L;
                }
                sortedMap.put(repTime, serverName);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        String serverName = sortedMap.get(sortedMap.firstKey());
        // 使用该服务完成请求，需要更新响应时间到 zk
        try {
            String data = new String(client.getData().forPath("/" + serverName), StandardCharsets.UTF_8);
            String newData = data.split("-")[0] + "-" + System.currentTimeMillis();
            client.setData().forPath("/" + serverName, newData.getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connectionCache.get(serverName);
    }


    // 服务下线，退订
    public interface Unsubscribe {
        void onUnsubscribe(Object connection);
    }

    // 服务上线，订阅
    public interface Subscribe {
        Object addSubscribe(String host, int port);
    }


}
